/*
  Copyright (c) 2015, Oracle and/or its affiliates. All rights reserved.

  This program is free software; you can redistribute it and/or modify
  it under the terms of the GNU General Public License as published by
  the Free Software Foundation; version 2 of the License.

  This program is distributed in the hope that it will be useful,
  but WITHOUT ANY WARRANTY; without even the implied warranty of
  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
  GNU General Public License for more details.

  You should have received a copy of the GNU General Public License
  along with this program; if not, write to the Free Software
  Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301  USA
*/

#include "mysqldump_tool_chain_maker.h"
#include "i_output_writer.h"
#include "file_writer.h"
#include "standard_writer.h"
#include "compression_lz4_writer.h"
#include "compression_zlib_writer.h"
#include "sql_formatter.h"
#include "mysqldump_tool_chain_maker_options.h"
#include <boost/algorithm/string.hpp>

using namespace Mysql::Tools::Dump;

void Mysqldump_tool_chain_maker::delete_chain(
  uint64 chain_id, I_object_reader* chain)
{}

I_object_reader* Mysqldump_tool_chain_maker::create_chain(
  Chain_data* chain_data, I_dump_task* dump_task)
{
  Table_rows_dump_task* rows_task= dynamic_cast<Table_rows_dump_task*>(
    dump_task);
  if (rows_task != NULL
    && (m_options->m_skip_rows_data
    || rows_task->get_related_table()->get_type() == "FEDERATED"
    || rows_task->get_related_table()->get_type() == "MRG_ISAM"
    || !this->compare_no_case_latin_with_db_string(
    "MRG_MyISAM", rows_task->get_related_table()->get_type())))
  {
    return NULL;
  }
  if (!m_options->is_object_included_in_dump(
    dynamic_cast<Abstract_data_object*>(
    dump_task->get_related_db_object())))
  {
    return NULL;
  }
  if (m_main_object_reader == NULL)
  {
    I_output_writer* writer;
    if (m_options->m_result_file.has_value())
      writer= new File_writer(
      this->get_message_handler(), this->get_object_id_generator(),
      m_options->m_result_file.value());
    else
      writer= new Standard_writer(
      this->get_message_handler(), this->get_object_id_generator());
    m_all_created_elements.push_back(writer);
    if (m_options->m_compress_output_algorithm.has_value())
    {
      std::string algorithm_name=
        m_options->m_compress_output_algorithm.value();
      boost::to_lower(algorithm_name);

      Abstract_output_writer_wrapper* compression_writer_as_wrapper= NULL;
      I_output_writer* compression_writer_as_writer= NULL;
      if (algorithm_name == "lz4")
      {
        Compression_lz4_writer* compression_writer=
          new Compression_lz4_writer(
            this->get_message_handler(), this->get_object_id_generator());
        compression_writer_as_wrapper= compression_writer;
          compression_writer_as_writer= compression_writer;
      }
      else if (algorithm_name =="zlib")
      {
        Compression_zlib_writer* compression_writer=
          new Compression_zlib_writer(
            this->get_message_handler(), this->get_object_id_generator(),
            Z_DEFAULT_COMPRESSION);
        compression_writer_as_wrapper= compression_writer;
        compression_writer_as_writer= compression_writer;
      }
      else
        this->pass_message(Mysql::Tools::Base::Message_data(
          0, "Unknown compression method: " + algorithm_name,
          Mysql::Tools::Base::Message_type_error));

      compression_writer_as_wrapper->register_output_writer(writer);
      writer= compression_writer_as_writer;
      m_all_created_elements.push_back(writer);
    }
    Sql_formatter* formatter= new Sql_formatter(
      this->get_connection_provider(),
      this->get_message_handler(), this->get_object_id_generator(),
      m_options->m_formatter_options);
    this->register_progress_watchers_in_child(formatter);
    formatter->register_output_writer(writer);
    m_all_created_elements.push_back(formatter);

    m_main_object_reader= new Mysql_object_reader(
      this->get_connection_provider(),
      this->get_message_handler(), this->get_object_id_generator(),
      m_options->m_object_reader_options);
    this->register_progress_watchers_in_child(m_main_object_reader);
    m_main_object_reader->register_data_formatter(formatter);
    m_all_created_elements.push_back(m_main_object_reader);
  }
  if (m_options->m_default_parallelism == 0)
  {
    return m_main_object_reader;
  }
  Abstract_data_object* data_object= dynamic_cast<Abstract_data_object*>(
    dump_task->get_related_db_object());
  
  int object_queue_id= (data_object != NULL)
    ? (m_options->get_object_queue_id_for_schema(data_object->get_schema()))
    : 0;
  std::map<int, Object_queue*>::iterator it=
    m_object_queues.find(object_queue_id);
  if (it != m_object_queues.end())
  {
    return it->second;
  }
  Object_queue* queue= new Object_queue(
    this->get_message_handler(), this->get_object_id_generator(),
    m_options->get_object_queue_threads_count(object_queue_id),
    new Mysql::Instance_callback<void, bool, Mysqldump_tool_chain_maker>(
      this, &Mysqldump_tool_chain_maker::mysql_thread_callback));
  this->register_progress_watchers_in_child(queue);
  queue->register_object_reader(m_main_object_reader);
  m_all_created_elements.push_back(queue);
  m_object_queues.insert(std::make_pair(object_queue_id, queue));
  return queue;
}

void Mysqldump_tool_chain_maker::mysql_thread_callback(bool is_starting)
{
  if (is_starting)
    mysql_thread_init();
  else
    mysql_thread_end();
}

Mysqldump_tool_chain_maker::~Mysqldump_tool_chain_maker()
{
  for (std::vector<I_chain_element*>::reverse_iterator it=
    m_all_created_elements.rbegin(); it != m_all_created_elements.rend();
    ++it)
  {
    delete *it;
  }
}

Mysqldump_tool_chain_maker::Mysqldump_tool_chain_maker(
  I_connection_provider* connection_provider,
  Mysql::I_callable<bool, const Mysql::Tools::Base::Message_data&>*
    message_handler, Simple_id_generator* object_id_generator,
  Mysqldump_tool_chain_maker_options* options)
  : Abstract_chain_element(message_handler, object_id_generator),
  Abstract_mysql_chain_element_extension(
    connection_provider, message_handler,
    options->m_mysql_chain_element_options),
  m_options(options),
  m_main_object_reader(NULL)
{}
